package com.ocss.ocsscustomerserver.custom;


import com.alibaba.fastjson.JSONObject;
import com.ocss.ocsscommon.message.ClientMessage;
import com.ocss.ocsscommon.message.KafkaMessage;
import com.ocss.ocsscommon.message.KafkaVisitorAssignMessage;
import com.ocss.ocsscustomerserver.tools.MessagePost;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @Auther: lijiang
 * @Date: 2023-04-17 20:06
 * @Description: MessageCustomer
 */

@Component
public class KafkaConsumer {

    @Autowired
    MessagePost messagePost;
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"customerSend"})
    public void listen1(String data) throws IOException {
        System.out.println(data);
       KafkaMessage kafkaMessage =JSONObject.parseObject(data, KafkaMessage.class);
        ClientMessage clientMessage = ClientMessage.builder()
                .to(kafkaMessage.getReceiveId())
                .from(kafkaMessage.getSendId())
                .type(kafkaMessage.getType())
                .data(kafkaMessage.getData()).build();
        messagePost.sendMessage(clientMessage);
    }
    @KafkaListener(id = "consumer2",groupId = "my-group1", topics = {"assignComplete"})
    public void listenEstablish(String data) throws IOException {
        System.out.println(data);
        KafkaVisitorAssignMessage establishReturn =JSONObject.parseObject(data, KafkaVisitorAssignMessage.class);
        KafkaVisitorAssignMessage establish = KafkaVisitorAssignMessage.builder()
                .Visitor(establishReturn.getVisitor())
                .commendType(establishReturn.getCommendType())
                .Customer(establishReturn.getCustomer())
                .build();
        messagePost.sendEstablish(establish);
    }
}


